LambdaからRDS/RDBを利用する際に意識したいポイント5選
CX事業本部@大阪の岩田です。私が現在関わっているプロジェクトではLambda × RDSというアーキテクチャを採用して開発を進めています。開発を進める中でLambda × RDS(RDB)という構成についてある程度ノウハウが貯まってきたので、注意したいポイントやオススメの設定をTIPS的に紹介していきます。
環境
以後の説明では以下の環境の一部もしくは組み合わせを利用しています。具体的なコードやSQLの例はプログラミング言語やDBエンジンに依存しますが、根底の考え方はどの言語、DBエンジンであっても共通するはずです。
- MySQL: 5.7
- Python: 3.7
- pymysql:0.9.3
- sqlalchemy:1.3.13
- Node.js: 12.x
- sequelize: 5.21.11
- sequelize-pool:2.3.0
接続文字列にLambdaのFunction名を含める
RDSで利用できる各種DBエンジンはDBセッションの状況が確認できるテーブルやビューを持っています。例えばOracleであればV$SESSION
を、PostgreSQLであればpg_stat_activity
を確認することで、現在どれぐらいのクライアントが接続しているのか?どういうSQLが発行されているのか?といった情報が確認できます。これらのテーブル/ビューは接続元のプログラムを識別するためのカラムを持っており、プログラムからDBに接続する際に適切なオプションを指定することで接続元プログラムの名称を反映することが可能です。例えばPythonからpymysql & sqlalchemyを利用してMySQLに接続する場合は以下のように接続文字列を指定しましょう。
user = '<DBユーザー>' password = '<パスワード>' db_name = '<データベース名>' db_host = '<ホスト名>' program_name = os.getenv('AWS_LAMBDA_FUNCTION_NAME') engine = sqlalchemy.create_engine(f'mysql+pymysql://{user}:{password}@{db_host}/{db_name}?charset=utf8&program_name={program_name}') Session = sessionmaker(bind=engine) session = Session() # ...略
接続文字列のオプションprogram_name
(具体的なオプション名はDBエンジンやライブラリ依存します)に環境変数AWS_LAMBDA_FUNCTION_NAME
から取得したLambda Functionの名前を指定するのがポイントです。こうすることで、どのLambdaがDBに対して何をしているか?を把握しやすくなります。例えば一部のLambdaにDB接続のクローズ漏れがあり、意図せずDBへの同時接続数が増えているとします。こういった状況では以下のようなSQLで原因を切り分けることが容易になります。
mysql> SELECT program_name, COUNT(*) from sys.session GROUP BY program_name; +-----------------+----------+ | program_name | count(*) | +-----------------+----------+ | mysql | 1 | | lambda_func_1 | 3 | | lambda_func_2 | 2 | | lambda_func_3 | 50 | +-----------------+----------+
もしprogram_name
を指定していなければ、犯人のLambdaを特定するのに無駄な時間を費やすことになるでしょう。接続文字列には必ずLambdaのFunction名を含め、調査しやすい環境を維持しましょう。
https://bugs.python.org/issue42967 https://github.com/python/cpython/commit/d0d4d30882fe3ab9b1badbecf5d15d94326fd13e#diff-b3712475a413ec972134c0260c8f1eb1deefb66184f740ef00c37b4487ef873eR729
DBエンジンのパラメータで最大同時接続数を調整する
各DBエンジンはmax_connections
のような同時接続数を制限するパラメータを持っています。実際のユースケースに沿ったシナリオで負荷テストを行い、結果に応じて最大同時接続数を調整しましょう。RDSのデフォルトパラメータはインスタンスサイズに応じた汎用的なパラメータ値が設定されていますが、これはあくまで汎用的な値です。Lambda × RDS(B)を選択するようなシステムではデフォルト値からある程度最大同時接続数を増やしても問題ないケースが多いはずです。
そもそも最大同時接続数というパラメータは何のために存在するのでしょうか?私は最大同時接続数というパラメータはDBにとっての安全弁のような役割だと理解しています。データベースは大別すると
- 全てのセッションで共有するメモリ領域
- セッション固有のメモリ領域
の2種類のメモリ領域を持ちます。OracleであればSGAとPGA、PostgreSQLであればshared_buffersとwork_memといった具合です。
例えば、以下の条件下での動作を考えてみましょう。実際にはメモリ以外のマシンリソースの考慮も必要ですが、考え方をシンプルにするためメモリに限定して考えます。
- マシン全体で4Gのメモリが利用可能
- OSやその他のプログラムがメモリを1G利用しており、DBは3Gのメモリが利用できる
- 全てのセッションで共有するメモリ領域として 2Gを設定
- セッション固有のメモリ領域として最大10Mを設定
同時に200クライアントがDBに接続し、それぞれのクライアントがセッション固有のメモリ領域を10M分フル利用した場合、メモリの使用量は2G + (200クライアント × 10M) となります。DBが利用可能な3Gを超過し、マシン全体でメモリが不足することが分かります。こうなるとスワップが発生し、パフォーマンスは著しく低下するでしょう。最大同時接続数を100に制限しておけば、制限を超過した100クライアントが接続エラーとなりますが、DB全体のパフォーマンス低下は抑制することができます。接続エラーとなったクライアントに関しても、少し時間を空けてからリトライしてもらえばスムーズに処理できる可能性が上がります。システム全体のスループットを悪化させないための安全弁として最大同時接続数のパラメータが機能するわけです。
ワークロードの特性的にセッション固有のメモリ領域を大量に消費することが予想される場合は最大同時接続数を下げるべきですが、逆にセッション固有のメモリ領域をあまり消費しないのであれば最大同時接続数のパラメータを増やし、より多くの接続を同時に受け付けても問題ないでしょう。
都度接続か、接続維持かを検討する
Lambdaの実行毎にRDS(B)に接続するのか、コールドスタート時のみ接続してウォームスタート時には確立済みの接続を利用するのか検討しましょう。
都度接続の場合は以下のようなコードになります。
def lambda_handler(event, context): # DB接続処理 # メインのロジック # 接続クローズ処理
接続維持の場合は以下のようなコードになります。
# DB接続処理 def lambda_handler(event, context): # メインのロジック
接続維持の場合ウォームスタート時には接続処理をスキップできるため、パフォーマンスの向上が期待できます。しかし、Lambda側から明示的に接続をクローズするポイントが無いことに注意が必要です。Lambda <-> RDS(B)間の接続は以下のようにオープン/クローズされることになります。
- Lambdaが頻繁に呼び出される場合
- 1回呼び出された後は実行環境が破棄されるまでの間2度と呼び出されない場合
いずれの場合であっても、Lambdaが最後に呼び出されてから実行環境が破棄されるまでの間、確立済みのDBセッションは何も仕事をしない訳です。
問題が出る具体例を考えてみましょう。
- FuncA、FuncBという2つのLambdaでシステムが構成されている
- FuncAは滅多に起動しないが、起動するタイミングでは一気に同時起動する
- FuncBはスパイクアクセスは発生しないが、ユーザーから定期的にアクセスされる
- 各LambdaはRDS(B)を利用し、実装はDB接続を維持するような実装になっている
- RDS(B)の最大同時接続数は100
この条件下でFuncAが起動して99個のLambda実行環境が起動したとします。FuncAの実行環境とRDS(B)の間に99個の接続がオープンした後、すぐにFuncAへのアクセス数は0まで落ち込みFuncAの実行環境はアイドル状態となります。FuncBに3名のユーザーがアクセスしてきました。FuncAの実行環境が3個起動し、RDS(B)に接続しに行きますが、FuncAの実行環境が99本のDBセッションを利用しているため、最大同時接続数超過のエラーでFuncBの実行は2/3が異常終了してしまいます。
接続維持パターンは同時接続数という限られたリソースを有効活用し辛くなるというデメリットがある訳です。私のプロジェクトでは処理の特性に応じてLambda Functionごとに都度接続と接続維持のパターンを使い分けて利用するようにしています。個人的なオススメは都度接続のパターンです。パフォーマンス面では少しデメリットがありますが、同時接続数の管理をシンプルに考えることができるからです。もしも都度接続によるオーバーヘッドが許容できない場合...それはLambdaを選定すべきユースケースではないのかもしれません。
DB側のパラメータを調整し、アイドル時は自動切断させる
接続維持パターンが問題となるのは、接続を確立したLambda実行環境がアイドル状態にも関わらずDB接続を確立しっぱなしになっていることが原因です。Lambda実行環境がリース期間を迎えるまでの間忙しく仕事をし続けるのであれば、DB接続を確立し続けるのは当然の権利であり、都度接続のアーキテクチャをとっても同時接続数が減ることはありません。
前述の通り接続維持パターンをとるのであれば、Lambdaからは明示的にDB接続をクローズするタイミングがないので、DB側のパラメータ調整で対応しましょう。MySQLであればwait_timeout
を短めの値に調整することで、Lambdaが一定期間アイドル状態になった場合にDB側から接続をクローズすることが可能になり、アイドル状態のLambda実行環境が同時接続数を食い潰すという問題が緩和されます。
ちなみに以前検証したことがあるのですが、TCPのKeep Alive系パラメータを使ってうまく自動切断させることはできないようです。
DBアクセスを伴う処理には再接続ロジックを組み込む
前述のようにアイドル時の自動切断を調整した場合、コールドスタート時に確立済みだったDBセッションが次回のウォームスタート時に既に切断されている可能性があります。例としてwait_timeout
に60秒を設定したMySQLに対して以下のLambdaを実行してみましょう。
conn = pymysql.connect(...略) def lambda_handler(event, context): try: with conn.cursor() as cursor: sql = "SELECT now()" cursor.execute(sql) result = cursor.fetchall() print(result)
コールドスタート時や、前回実行時から1分未満で実行されたウォームスタートの場合は特に問題ありません。しかし、1分を超える間隔を空けてからウォームスタートで実行された場合は以下のようなエラーが発生します。
[ERROR] 2020-06-07T11:48:24.17Z 24303bf4-b066-4222-8b1f-0deb2d70a53f (2013, 'Lost connection to MySQL server during query') [ERROR] OperationalError: (2013, 'Lost connection to MySQL server during query') Traceback (most recent call last): File "/var/task/lambda_function.py", line 38, in lambda_handler raise e ...略
対策としてDBアクセス処理には必要に応じて再接続を行うロジックを組み込みましょう。次のトピックにも関連しますが、何かしらのライブラリを利用している場合は接続リトライの機構が入っていることが多いでしょう。
ライブラリの特性を理解して使う
LambdaからRDS(B)にアクセスする場合、通常は何かしらのライブラリを利用すると思います。ライブラリを利用することで面倒な諸々の処理が隠蔽され、LambddaからRDS(B)にアクセスすることが簡単になります。ここで注意したいのが、ライブラリが裏でやってくれている諸々の処理です。各種のライブラリはLambda環境で利用するために開発されたものではなく、何も気にせずにデフォルトの設定で利用していると問題を引き起こす可能性があります。特にコネクションプーリング周りの挙動を理解しないまま利用すると意図せずDBの同時接続数を食い潰しかねません。具体例を見てみましょう。
例1.Node.js × Sequelizeの場合
例えば以下のコードを実行した時、レスポンスが返ってくるまでの所要時間はどの程度だと思いますか?MySQLに接続してselect 1
するだけのコードです。
const { Sequelize } = require('sequelize'); exports.handler = function(event, context, callback) { const sequelize = new Sequelize('<DB名>', '<ユーザー名>', '<パスワード>', { host: '<ホスト名>', dialect: 'mysql', dialectOptions: { connectAttributes: { program_name: process.env.AWS_LAMBDA_FUNCTION_NAME } } } ); sequelize.query('select 1', {type: sequelize.QueryTypes.SELECT}) .then(data => callback(null, data)); }
Lambdaのメモリ割り当てやAZ跨ぎの有無で多少変動しますが、まあ100msもあれば終わりそうな処理です。なんと実際には実行~レスポンス返却まで10秒弱の時間が必要です。この事象はSequelizeの仕様とNode.jsの「非同期ではないハンドラー」の仕様によって引き起こされます。
「非同期ではないハンドラー」に関して公式ドキュメントには以下のように記載されています。
非ハンドラーではない関数の場合は、イベントループが空になるか、関数がタイムアウトするまで、関数の実行が継続されます。レスポンスは、すべてのイベントループタスクが完了するまで、呼び出し元に送信されません。関数がタイムアウトした場合は、エラーが返ります。すぐにレスポンスが返るようにランタイムを設定するには、context.callbackWaitsForEmptyEventLoopを false に設定します。
Sequelizeは内部的にコネクションプーリングの機構を利用しており、上記のコードでDBにアクセスすると内部的にはコネクションのプールを作成します。プーリングされたコネクションはoptions.pool.idle
というパラメータで指定された時間だけアイドル状態の場合にクローズされます。そして、このクローズ処理はsequelize-poolの内部で以下のようにsetTimeout
を利用して実装されています。
_scheduleRemoveIdle() { if (!this._removeIdleScheduled) { this._removeIdleScheduled = true; this._removeIdleTimer = setTimeout(() => { this._removeIdle(); }, this._factory.reapInterval); } }
先ほどの公式ドキュメントの説明に当てはめると、SQLを実行した直後はsetTimeout
でスケジュールされた処理が実行されずておらず、すべてのイベントループタスクが完了
していないため、レスポンスが呼び出し元に送信されないのです。
対策としては
- context.callbackWaitsForEmptyEventLoopをfalseに設定する
- 非同期ハンドラーを使う
- SQL発行後に明示的に
sequelize.close()
を入れることで、自動クローズを待たずに全てのDB接続をクローズする options.pool.idle
の値を1などの小さな値に設定する
といった対策が考えられます。それぞれ考慮事項やメリット/デメリットはあるかと思いますが、今回はこれ以上深堀りしません。
例2.Python × SQLAlchemyの場合
もう1つ具体例を見てみましょう。Python × SQLAlchemyの組み合わせでRDS(B)に接続するケースです。
import sqlalchemy from sqlalchemy.orm import sessionmaker import os import json def lambda_handler(event, context): user = '<ユーザー名>' password = '<パスワード>' db_name = '<DB名>' db_host = '<ホスト名>' program_name = os.getenv('AWS_LAMBDA_FUNCTION_NAME') engine = sqlalchemy.create_engine(f'mysql+pymysql://{user}:{password}@{db_host}/{db_name}?charset=utf8;program_name={program_name}') Session = sessionmaker(bind=engine) session = Session() session.execute('select * from information_schema.tables') session.close() return { 'statusCode': 200, 'body': json.dumps('Hello from Lambda!') }
このコードを実行した後、Lambda実行環境<->RDS(B)間のセッションはどうなっているでしょうか?session.close()しているにも関わらず、セッションは切断されずに残り続けます。
SQLAlchemyもSequelizeと同様に内部的にコネクションプーリングの機構を備えています。session.close()
はプログラムから利用しているセッションをプールに返却するだけで、実際にはLambda実行環境<->RDS(B)間のセッションは切断しません。 処理の前後にprint(session.bind.pool.status())
を入れると良く分かります。
# ...略 print(session.bind.pool.status()) session.execute('select * from information_schema.tables') print(session.bind.pool.status()) session.close() print(session.bind.pool.status())
出力は以下のようになります。
Pool size: 5 Connections in pool: 0 Current Overflow: -5 Current Checked out connections: 0 Pool size: 5 Connections in pool: 0 Current Overflow: -4 Current Checked out connections: 1 Pool size: 5 Connections in pool: 1 Current Overflow: -4 Current Checked out connections: 0
execute
前はプールされたコネクションは存在せず、execute
実行時にDBとの接続を確立してプールの管理下へ。session.close()
した段階でチェックアウトしていたコネクションをプールに返却という流れで動作していることが分かります。プールに返却するだけでなく、プールされたコネクションを閉じたい場合は以下のよにsession.bind.dispose()
を入れましょう。
...略 print(session.bind.pool.status()) session.execute('select * from information_schema.tables') print(session.bind.pool.status()) session.close() print(session.bind.pool.status()) session.bind.dispose() print(session.bind.pool.status())
実行結果は以下のようになります。
Pool size: 5 Connections in pool: 0 Current Overflow: -5 Current Checked out connections: 0 Pool size: 5 Connections in pool: 0 Current Overflow: -4 Current Checked out connections: 1 Pool size: 5 Connections in pool: 1 Current Overflow: -4 Current Checked out connections: 0 Pool size: 5 Connections in pool: 0 Current Overflow: -5 Current Checked out connections: 0
Lambda実行後にDB側から確認してもセッションがクローズされていることが分かります。
また、SQLAlchemyはコネクションプーリングの実装として複数の選択肢を用意してくれています。
システムのバックエンドにLambdaを採用した場合、アプリの実行環境に同時にルーティングされるユーザーのリクエストは1つだけです。EC2やECSであれば実行環境1に対してNリクエストが流れるので、アプリ側で複数のDBセッションをプーリングするのは有効な選択肢ですが、Lambdaの場合はアプリ側で複数のDBセッションをプーリングするメリットはなく、必要なDBセッションはLambda実行環境毎に1つというユースケースがほとんどでしょう。デフォルトのQueuePoolではなくDBセッションをシングルトンとして利用できるStaticPoolを利用することをオススメします。仮に複数のDBセッションが必要でQueuePoolを利用する場合でも、デフォルトのpool_size
である5は過剰になるでしょう。ユースケースに応じたpool_size
を指定しましょう。QueuePoolを使用する場合でも妥当なpool_size
はせいぜい1か2でしょう。先ほど例にあげたSequelizeの場合も同様にoptions.pool.max
を減らしておくと良いでしょう。
※ログ用のテーブルにログを書き込む設計で、メインのトランザクションがロールバックされた場合もログは残しておきたい といったユースケースではメインのDBセッションとログ用のDBセッションを使い分けるようなケースがあるかもしれません。Lambdaでやる以上はログの出力先を再検討した方が良いかもしれませんが。
X-Rayの利用を検討する
RDS(B)の利用に限った話ではないですが、X-Rayによるトレースを仕込んでおくと何かと調査が捗ります。X-Rayの利用と、どのように利用するのが効果的かを検討しましょう。例えばPython × SQLalchemyの場合、aws_xray_sdk.ext.sqlalchemy.query
パッケージを利用することでSQLalcemyのクエリ操作に対して透過的にX-Rayのトレースを仕込むことができます。Python × SQLalchemyでの X-Ray利用例です。XRaySessionMaker
を利用してSQLAlchemyのsessionを生成します。
from aws_xray_sdk.core import xray_recorder from aws_xray_sdk.ext.sqlalchemy.query import XRaySessionMaker import sqlalchemy from sqlalchemy.orm import sessionmaker import os import json xray_recorder.begin_segment('SQLAlchemyTest') from sqlalchemy.ext.declarative import declarative_base Base = declarative_base() class User(Base): __tablename__ = 'users' ...略 def lambda_handler(event, context): ...略 engine = sqlalchemy.create_engine(f'mysql+pymysql://{user}:{password}@{db_host}/{db_name}?charset=utf8;program_name={program_name}') Session = XRaySessionMaker(bind=engine) session = Session() users = session.query(User).filter(User.id != 1).filter(User.name != 'hoge').order_by(User.id).all() for u in users: print(u) session.close() xray_recorder.end_segment() return { 'statusCode': 200, 'body': json.dumps('Hello from Lambda!') }
このコードを実行した際のX-Rayのトレース結果です。
SQLAlchemy関連のトレース情報が色々と出力されていることが分かります。
このように実行されたSQLまで確認することが可能です。ただaws_xray_sdk.ext.sqlalchemy.query
を使うと、SQL発行部分以外にもqueryクラスの諸々の処理がトレースされ、かなりノイズが多くなる印象です。スロークエリに関してはRDS(B)側のログで追いかけられますし、aws_xray_sdk.ext.sqlalchemy.query
は利用せずに、トレースしたいクエリ処理に対して自前でアノテーションを付与していく方が利用しやすいかもしれません。
# ...略 engine = sqlalchemy.create_engine(f'mysql+pymysql://{user}:{password}@{db_host}/{db_name}?charset=utf8;program_name={program_name}') Session = sessionmaker(bind=engine) session = Session() @xray_recorder.capture('query_user') def query_user(): return session.query(User).filter(User.id != 1).filter(User.name != 'hoge').order_by(User.id).all() def lambda_handler(event, context): users = query_user() for u in users: print(u) session.close() xray_recorder.end_segment() return { 'statusCode': 200, 'body': json.dumps('Hello from Lambda!') }
こちらのコードを実行した結果です。
先ほどに比べるとかなり出力がスッキリしました。SQL実行部分の処理時間も取得できているので、情報量としてはこれぐらいで十分かもしれませんね。このあたりはまだ色々と試行錯誤しているところです。
まとめ
VPC Lambdaのコールドスタート問題が解消し、以前に比べるとLambdaからRDS(B)を利用するという選択肢が取りやすくなりましたが、それでもDynamoDBのオンデマンドモード利用に比べると注意すべきポイントは多いように感じます。もしLambda × RDS(B)というアーキテクチャを採用する場合は今回ご紹介したトピックを活用して頂ければ幸いです。